package net.dubboclub.http.netty;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.remoting.http.HttpHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import net.dubboclub.http.netty.servlet.NettyHttpServletRequest;
import net.dubboclub.http.netty.servlet.NettyHttpServletResponse;

import java.util.concurrent.*;

/**
 * @date: 2016/3/7.
 * @author:bieber.
 * @project:dubbo-side.
 * @package:com.alibaba.dubbo.remoting.http.netty.
 * @version:1.0.0
 * @fix:
 * @description: 该handler是所有channel共享的，所以同一时间会有多个channel在使用该实体，需要控制并发情况
 */
@ChannelHandler.Sharable
public class RestfulHttpHandler extends ChannelInboundHandlerAdapter {


    private HttpHandler httpHandler;

    private ExecutorService executorService;

    private static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));

    private static final Logger logger = LoggerFactory.getLogger(RestfulHttpHandler.class);

    //记录所有channel请求对应的request实体，当接收最后的内容片段之后，会将这个channel的信息从该map中移除
    private static ConcurrentHashMap<Channel, NettyHttpServletRequest> channelRequestMapping = new ConcurrentHashMap<Channel, NettyHttpServletRequest>();



    public RestfulHttpHandler(HttpHandler httpHandler, URL url) {
        this.httpHandler = httpHandler;
        this.executorService = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    }


    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        logger.debug("Channel [" + ctx.channel() + "] unregistered.");
        if (NettyHttpServer.getChannelMapping().containsKey(ctx.channel())) {
            NettyHttpServer.getChannelMapping().remove(ctx.channel());
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        logger.debug("Channel [" + ctx.channel() + "] registered.");
        ChannelHolder channelHolder = new ChannelHolder(ctx.channel());
        NettyHttpServer.getChannelMapping().putIfAbsent(ctx.channel(), channelHolder);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //请求第一次进来的时候
        if (msg.getClass() == DefaultHttpRequest.class) {//new request coming
            NettyHttpServletRequest request = new NettyHttpServletRequest((DefaultHttpRequest) msg, ctx.channel());
            channelRequestMapping.putIfAbsent(ctx.channel(), request);
        } else if (channelRequestMapping.containsKey(ctx.channel()) && msg.getClass() == DefaultHttpContent.class) {//内容片段
            offerContent(ctx.channel(), (HttpContent) msg, false);
        } else if (channelRequestMapping.containsKey(ctx.channel()) && LastHttpContent.class.isAssignableFrom(msg.getClass())) {//最后一个内容片段
            offerContent(ctx.channel(), (HttpContent) msg, true);
            channelRequestMapping.remove(ctx.channel());
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //更新该channel的最后写时间
        if(NettyHttpServer.getChannelMapping().containsKey(ctx.channel())){
            NettyHttpServer.getChannelMapping().get(ctx.channel()).read();
        }
    }

    /**
     * 将一个请求的内容片段收集在一起，然后进行处理
     *
     * @param channel
     * @param content
     * @param isLast
     */
    private void offerContent(Channel channel, HttpContent content, boolean isLast) {
        try{
            final NettyHttpServletRequest request = channelRequestMapping.get(channel);
            request.readContent(content);
            if (isLast) {//最后一个内容接收完毕，那么可以对该请求进行处理
                ExecutorService executorService = getExecutorService();
                final NettyHttpServletResponse response = new NettyHttpServletResponse(request.getHttpRequest(), NettyHttpServer.getChannelMapping().get(channel));
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            httpHandler.handle(request, response);
                        } catch (Throwable e) {
                            response.exception(e);
                        } finally {
                            request.completed();
                            response.send();
                        }
                    }
                });
            }
        }finally {
            assert content.refCnt()==1;
            content.release();
        }
    }

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executorService;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("Channel "+ctx.channel()+" occur an exception.",cause);
    }
}
